{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Note\n",
    "\n",
    "Please view the [README](https://github.com/eclipse/deeplearning4j-examples/blob/master/tutorials/README.md) to learn about installing, setting up dependencies, and importing notebooks in Zeppelin"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Background"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this tutorial, we will learn how to apply a long-short term memory (LSTM) neural network to a medical time series problem. The data used comes from 4000 intensive care unit (ICU) patients and the goal is to predict the mortality of patients using 6 general descriptor features, such as age, gender, and weight along with 37 sequential features, such as cholesterol level, temperature, pH, and glucose level. Each patient has multiple measurements of the sequential features, with patients having a different amount of measurements taken. Furthermore, the time between measurements also differ among patients as well. \n",
    "\n",
    "A LSTM is well suited for this type of problem due to the sequential nature of the data. In addition, LSTM networks avoid vanishing and exploding gradients and are able to effectively capture long term dependencies due to its cell state, a feature not present in typical recurrent networks. For a more in depth explanation of LSTM's, see https://deeplearning4j.org/lstm.html."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "import io.skymind.zeppelin.utils._\n",
    "import io.skymind.modelproviders.history.client.ModelHistoryClient\n",
    "import io.skymind.modelproviders.history.model._\n",
    "import org.datavec.api.records.reader.SequenceRecordReader;\n",
    "import org.datavec.api.records.reader.impl.csv.CSVSequenceRecordReader;\n",
    "import org.datavec.api.split.NumberedFileInputSplit;\n",
    "import org.deeplearning4j.datasets.datavec.SequenceRecordReaderDataSetIterator;\n",
    "import org.deeplearning4j.eval.ROC;\n",
    "import org.deeplearning4j.nn.api.OptimizationAlgorithm;\n",
    "import org.deeplearning4j.nn.conf.ComputationGraphConfiguration;\n",
    "import org.deeplearning4j.nn.conf.NeuralNetConfiguration;\n",
    "import org.deeplearning4j.nn.conf.Updater;\n",
    "import org.deeplearning4j.nn.conf.layers.GravesLSTM;\n",
    "import org.deeplearning4j.nn.conf.layers.RnnOutputLayer;\n",
    "import org.deeplearning4j.nn.graph.ComputationGraph;\n",
    "import org.deeplearning4j.optimize.listeners.ScoreIterationListener;\n",
    "import org.deeplearning4j.nn.conf.graph.rnn.LastTimeStepVertex;\n",
    "import org.deeplearning4j.nn.conf.layers.OutputLayer;\n",
    "import org.nd4j.linalg.api.ndarray.INDArray;\n",
    "import org.deeplearning4j.nn.weights.WeightInit;\n",
    "import org.nd4j.linalg.activations.Activation;\n",
    "import org.nd4j.linalg.dataset.api.DataSet;\n",
    "import org.nd4j.linalg.lossfunctions.LossFunctions;\n",
    "import org.nd4j.linalg.dataset.api.iterator.DataSetIterator;\n",
    "import org.nd4j.linalg.dataset.api.DataSetPreProcessor;\n",
    "import org.nd4j.linalg.factory.Nd4j;\n",
    "import org.nd4j.linalg.indexing.BooleanIndexing;\n",
    "import org.nd4j.linalg.indexing.NDArrayIndex;\n",
    "import org.nd4j.linalg.indexing.conditions.Conditions;\n",
    "import org.nd4j.linalg.primitives.Pair;\n",
    "import org.slf4j.Logger;\n",
    "import org.slf4j.LoggerFactory;\n",
    "\n",
    "import java.io.File;\n",
    "import org.apache.commons.io.FileUtils;\n",
    "import org.apache.commons.io.FilenameUtils;\n",
    "import java.io.IOException;\n",
    "import java.util.HashMap;\n",
    "import java.util.Arrays;\n",
    "import java.net.URL;\n",
    "import java.io.BufferedInputStream;\n",
    "import java.io.FileInputStream;\n",
    "import java.io.BufferedOutputStream;\n",
    "import java.io.FileOutputStream;\n",
    "import java.lang.Byte;\n",
    "\n",
    "import org.apache.commons.compress.archivers.tar.TarArchiveEntry;\n",
    "import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;\n",
    "import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;\n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now that we have imported everything needed to run this tutorial, we will start with obtaining the data and then converting the  data into a format a neural network can understand. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Data Source"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The data is contained in a compressed tar.gz file. We will have to download the data from the url below and then extract csv files containing the ICU data. Each patient will have a separate csv file for the features and labels. The features will be contained in a directory called sequence and the labels will be contained in a directory called mortality. The features are contained in a single csv file with the columns representing the features and the rows representing different time steps. The labels are contained in a single csv file which contains a value of 0 indicating death and a value of 1 indicating survival."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "val DATA_URL = \"https://skymindacademy.blob.core.windows.net/physionet2012/physionet2012.tar.gz\"\n",
    "val DATA_PATH = FilenameUtils.concat(System.getProperty(\"java.io.tmpdir\"), \"dl4j_physionet/\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Download Data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To download the data, we will create a temporary directory that will store the data files, extract the tar.gz file from the url, and place it in the specified directory."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "val directory = new File(DATA_PATH)\n",
    "directory.mkdir() // create new directory at specified path\n",
    "\n",
    "val archizePath = DATA_PATH + \"physionet2012.tar.gz\" // set path for tar.gz file\n",
    "val archiveFile = new File(archizePath) // create tar.gz file\n",
    "val extractedPath = DATA_PATH + \"physionet2012\" \n",
    "val extractedFile = new File(extractedPath)\n",
    "\n",
    "FileUtils.copyURLToFile(new URL(DATA_URL), archiveFile) // copy data from URL to file"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    " \n",
    "\n",
    "Next, we must extract the data from the tar.gz file, recreate directories within the tar.gz file into our temporary directory, and copy the files into our temporary directory. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "var fileCount = 0\n",
    "var dirCount = 0\n",
    "val BUFFER_SIZE = 4096\n",
    "\n",
    "val tais = new TarArchiveInputStream(new GzipCompressorInputStream( new BufferedInputStream( new FileInputStream(archizePath))))\n",
    "\n",
    "var entry = tais.getNextEntry().asInstanceOf[TarArchiveEntry]\n",
    "\n",
    "while(entry != null){\n",
    "    if (entry.isDirectory()) {\n",
    "        new File(DATA_PATH + entry.getName()).mkdirs()\n",
    "        dirCount = dirCount + 1\n",
    "        fileCount = 0\n",
    "    }\n",
    "    else {\n",
    "        \n",
    "        val data = new Array[scala.Byte](4 * BUFFER_SIZE)\n",
    "\n",
    "        val fos = new FileOutputStream(DATA_PATH + entry.getName());\n",
    "        val dest = new BufferedOutputStream(fos, BUFFER_SIZE);\n",
    "        var count = tais.read(data, 0, BUFFER_SIZE)\n",
    "        \n",
    "        while (count != -1) {\n",
    "            dest.write(data, 0, count)\n",
    "            count = tais.read(data, 0, BUFFER_SIZE)\n",
    "        }\n",
    "        \n",
    "        dest.close()\n",
    "        fileCount = fileCount + 1\n",
    "    }\n",
    "    if(fileCount % 1000 == 0){\n",
    "        print(\".\")\n",
    "    }\n",
    "    \n",
    "    entry = tais.getNextEntry().asInstanceOf[TarArchiveEntry]\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### DataSetIterators"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Our next goal is to convert the raw data (csv files) into a DataSetIterator, which can then be fed into a neural network for training. Our training data will have 3200 examples which will be represented by a single DataSetIterator, and the testing data will have 800 examples which will be represented by a separate DataSet Iterator."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "val NB_TRAIN_EXAMPLES = 2000 // number of training examples\n",
    "val NB_TEST_EXAMPLES = 800 // number of testing examples"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In order to obtain DataSetIterators, we must first initialize CSVSequenceRecordReaders, which will parse the raw data into record-like format. We will first set the directories for the features and labels and initialize the CSVSequenceRecordReaders.\n",
    "\n",
    "Next, we can initialize the SequenceRecordReaderDataSetIterator using the previously created CSVSequenceRecordReaders. We will use an alignment mode of ALIGN_END. This alignment mode is needed due to the fact that the number of time steps differs between different patients. Because the mortality label is always at the end of the sequence, we need all the sequences aligned so that the time step with the mortality label is the last time step for all patients. For a more in depth explanation of alignment modes, see https://deeplearning4j.org/usingrnns. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "val path = FilenameUtils.concat(DATA_PATH, \"physionet2012/\") // set parent directory\n",
    "\n",
    "val featureBaseDir = FilenameUtils.concat(path, \"sequence\") // set feature directory\n",
    "val mortalityBaseDir = FilenameUtils.concat(path, \"mortality\") // set label directory\n",
    "\n",
    "// Load training data\n",
    "\n",
    "val trainFeatures = new CSVSequenceRecordReader(1, \",\")\n",
    "trainFeatures.initialize( new NumberedFileInputSplit(featureBaseDir + \"/%d.csv\", 0, NB_TRAIN_EXAMPLES - 1))\n",
    "\n",
    "val trainLabels = new CSVSequenceRecordReader()\n",
    "trainLabels.initialize(new NumberedFileInputSplit(mortalityBaseDir + \"/%d.csv\", 0, NB_TRAIN_EXAMPLES - 1))\n",
    "\n",
    "val trainData = new SequenceRecordReaderDataSetIterator(trainFeatures, trainLabels,\n",
    "                1, 2, false, SequenceRecordReaderDataSetIterator.AlignmentMode.ALIGN_END)\n",
    "\n",
    "        \n",
    "// Load testing data\n",
    "val testFeatures = new CSVSequenceRecordReader(1, \",\");\n",
    "testFeatures.initialize(new NumberedFileInputSplit(featureBaseDir + \"/%d.csv\", NB_TRAIN_EXAMPLES, NB_TRAIN_EXAMPLES + 50));\n",
    "       \n",
    "val testLabels = new CSVSequenceRecordReader();\n",
    "testLabels.initialize(new NumberedFileInputSplit(mortalityBaseDir + \"/%d.csv\", NB_TRAIN_EXAMPLES, NB_TRAIN_EXAMPLES  + 50));\n",
    "\n",
    "val testData =  new SequenceRecordReaderDataSetIterator(testFeatures, testLabels,\n",
    "                1, 2, false, SequenceRecordReaderDataSetIterator.AlignmentMode.ALIGN_END)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Neural Network Configuration"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we can finally configure and then initialize the neural network for this problem. We will be using the ComputationGraph class of DL4J."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "// Set neural network parameters\n",
    "val NB_INPUTS = 86\n",
    "val NB_EPOCHS = 10\n",
    "val RANDOM_SEED = 1234\n",
    "val LEARNING_RATE = 0.005\n",
    "val BATCH_SIZE = 32\n",
    "val LSTM_LAYER_SIZE = 200\n",
    "val NUM_LABEL_CLASSES = 2 "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "val conf = new NeuralNetConfiguration.Builder()\n",
    "                .optimizationAlgo(OptimizationAlgorithm.STOCHASTIC_GRADIENT_DESCENT)\n",
    "                .l2(0.01)\n",
    "                .graphBuilder()\n",
    "                .addInputs(\"in\")\n",
    "                .addLayer(\"lstm\", new GravesLSTM.Builder().nIn(NB_INPUTS).nOut(30).build(), \"in\")\n",
    "                .addVertex(\"lastStep\", new LastTimeStepVertex(\"in\"), \"lstm\")\n",
    "                .addLayer(\"out\", new OutputLayer.Builder().activation(Activation.SOFTMAX).nIn(30).nOut(2)\n",
    "                        .build(), \"lastStep\")\n",
    "                .setOutputs(\"out\")\n",
    "                .build();\n",
    "\n",
    "\n",
    "val model = new ComputationGraph(conf);\n",
    "model.init();"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Training"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Before we train the neural network, we first need to preprocess the data so that only the last step of the labels array is used for training. Thus, we will define the LastStepPreProc class to do this, which is an extension of the DataSetPreProcessor."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "class LastStepPreProc extends DataSetPreProcessor {\n",
    "\n",
    "    override def preProcess(in : DataSet) {\n",
    "\n",
    "        val origLabels = in.getLabels();\n",
    "        val lMask = in.getLabelsMaskArray();\n",
    "\n",
    "        //On master: use TimeSeriesUtils.pullLastTimeSteps(origLabels, lMask);\n",
    "        \n",
    "        val labels2d = pullLastTimeSteps(origLabels, lMask);\n",
    "      \n",
    "        in.setLabels(labels2d);\n",
    "        in.setLabelsMaskArray(null);\n",
    "    }\n",
    "\n",
    "     def pullLastTimeSteps( pullFrom : INDArray, mask : INDArray) : INDArray = {\n",
    "        if (mask == null) {\n",
    "            //No mask array -> extract same (last) column for all\n",
    "            var lastTS = pullFrom.size(2) - 1;\n",
    "            var out = pullFrom.get(NDArrayIndex.all(), NDArrayIndex.all(), NDArrayIndex.point(lastTS));\n",
    "            var fwdPassTimeSteps = null; //Null -> last time step for all examples\n",
    "            out\n",
    "            \n",
    "        } else {\n",
    "            var outShape = new Array[Double](2);\n",
    "            outShape(0) = pullFrom.size(0);\n",
    "            outShape(1) = pullFrom.size(1);\n",
    "                \n",
    "            var out = Nd4j.create(outShape);\n",
    "\n",
    "            //Want the index of the last non-zero entry in the mask array\n",
    "            var lastStepArr = BooleanIndexing.lastIndex(mask, Conditions.epsNotEquals(0.0), 1);\n",
    "            var fwdPassTimeSteps = lastStepArr.data().asInt();\n",
    "            \n",
    "            for ( i <- 0 to fwdPassTimeSteps.length-1) {\n",
    "                out.putRow(i, pullFrom.get(NDArrayIndex.point(i), NDArrayIndex.all(),\n",
    "                        NDArrayIndex.point(fwdPassTimeSteps(i))));\n",
    "            }\n",
    "            out\n",
    "        }\n",
    "    }\n",
    "}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    " \n",
    "\n",
    "To actually train the neural network, we use a for loop for the number of epochs to train. We then extract each DataSet, preprocess it, and fit it to the model."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "val proc = new LastStepPreProc()\n",
    "\n",
    "trainData.reset()\n",
    "\n",
    "for( i <- 1 to 5){\n",
    "    println(\"Epoch:\")\n",
    "    println(i)\n",
    "    while(trainData.hasNext()){\n",
    "        val batch = trainData.next()\n",
    "        proc.preProcess(batch) \n",
    "        model.fit(batch)\n",
    "    }\n",
    "    trainData.reset()\n",
    "}\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    " \n",
    "\n",
    "### Adding Model to SKIL Experiment\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To finally add the model to the SKIL experiment, we will initialize a skilContext and use the addModelToExperiment method."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "val skilContext = new SkilContext()\n",
    "val client = skilContext.client\n",
    "val model_id = skilContext.addModelToExperiment(z, model)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We then initialize an Evaluation class in order to evalute how well our model performs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "testData.reset()\n",
    "val eval = new Evaluation(2);"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can then evaluate our test set using a while loop."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "while(testData.hasNext()){\n",
    "    val batch = testData.next()\n",
    "    proc.preProcess(batch) \n",
    "    val myOutput = model.output(batch.getFeatures())\n",
    "    eval.eval(batch.getLabels(),myOutput(0))\n",
    "}\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally we add the evaluation to the model using the addEvaluationToModel method of the skilContext."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {
    "autoscroll": "auto"
   },
   "outputs": [],
   "source": [
    "skilContext.addEvaluationToModel(z, model_id, eval)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Spark 2.0.0 - Scala 2.11",
   "language": "scala",
   "name": "spark2-scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "mimetype": "text/x-scala",
   "name": "scala",
   "pygments_lexer": "scala",
   "version": "2.11.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
